package com.traffic.visim.kafkamq.listener;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.traffic.visim.kafkamq.constant.KafkaConstant;
import com.traffic.visim.log.entity.*;
import com.traffic.visim.log.service.LogService;
import com.traffic.visim.onvif.dto.ImageInfo;
import com.traffic.visim.service.OnvifService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Map;
import java.util.Optional;


/**
 * @description: kafka消费
 * @author: mipengchong
 * @create: 2019-09-20 14:19
 **/
@Slf4j
@Component
public class KafkaConsumerListener {

    @Autowired
    private OnvifService onvifService;
    @Autowired
    private LogService logService;

    /**
     * 业务处理 //TODO topic 需要统一配置
     * 配置文件参见：配置文件
     *
     * @param @KafkaListener(topics = "#{'${spring.kafka.listener.topics}'.split(',')}")
     * @param record
     * @KafkaListener(groupId = "${spring.kafka.consumer.group-id}",topics = {KafkaTopic.JIEKE_DMP_CROWD_COMPUTE, KafkaTopic.JIEKE_DMP_CROWD_PORTRAIT_COMPUTE, KafkaTopic.JIEKE_DMP_CROWD_UPLOAD, KafkaTopic.JIEKE_DMP_CROWD_TO_PORTRAIT_COMPUTE})
     */
    @KafkaListener(
            //groupId = "${spring.kafka.consumer.group-id}",
            topics = "#{'${spring.kafka.listener.topics}'.split(',')}")
    public void consume(ConsumerRecord<?, String> record) {

        Long offset = record.offset();
        Optional<String> optional = Optional.ofNullable(record.value());
        if (optional.isPresent()) {
            String value = optional.get();
            //tag
            Object key = record.key();

            //topic
            String topic = record.topic();

            log.info("接受消息  topic: [{}] key ,[{}],value: [{}]，offset : {} ", topic, key, value, offset);
            if ("event".equals(key)&&topic.equals(KafkaConstant.KAFKA_TRAFFIC_TOPIC))
            {
                Map map= JSON.parseObject(value);
                String ip= (String) map.get("ip");
                Integer type= (Integer) map.get("type");
                onvifService.sendEvent(ip,type);
            }

            if ("image".equals(key)&&topic.equals(KafkaConstant.KAFKA_TRAFFIC_TOPIC))
            {
                ImageInfo info= JSONObject.parseObject(value,ImageInfo.class);
                onvifService.createImage(info);
            }
            if ("sendLog".equals(key)&&topic.equals(KafkaConstant.KAFKA_TRAFFIC_TOPIC)){
                SendLog sendLog = JSONObject.parseObject(value, SendLog.class);
                if (sendLog.getType() == 0){
                    ExceptionIp exceptionIp = ExceptionIp.builder().createTime(new Date()).ip(sendLog.getIp()).type(sendLog.getType()).build();
                    ExceptionFlow exceptionFlow = ExceptionFlow.builder().createTime(new Date()).ip(sendLog.getIp()).type(sendLog.getType()).build();
                    if (logService.findIp(sendLog.getIp())){
                        logService.saveExceptionFlow(exceptionFlow);
                    }
                    else {
                        logService.saveExceptionFlow(exceptionFlow);
                        logService.saveExceptionIp(exceptionIp);
                    }
                }else{
                    ExceptionPermission exceptionPermission = ExceptionPermission.builder().createTime(new Date()).type(sendLog.getType()).build();
                    logService.saveExceptionPermission(exceptionPermission);
                }
            }
        }

    }
}
